-
Notifications
You must be signed in to change notification settings - Fork 15
Conversation
Codecov Report
@@ Coverage Diff @@
## master #208 +/- ##
==========================================
- Coverage 62.03% 61.58% -0.45%
==========================================
Files 85 83 -2
Lines 4104 4056 -48
==========================================
- Hits 2546 2498 -48
- Misses 1235 1237 +2
+ Partials 323 321 -2
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
new runner is 91% covered by tests but since more code is deleted than added, overall coverage goes down. |
clarified the description a bit, on feedback from @rihter007 |
I haven't started full review yet, it will take a while. What this is missing is emissions of This is duplication of how we keep track of the state and we should converge into using the same approach both for resume and and Status queries. |
@marcoguerri thanks! indeed, i did not pay sufficient attention to the events that should be emitted. this should be fixed now - i added TargetIn, TargetOut, TargetErr and made sure a TestError is emitted in the right cases. |
4fa6f44
to
ec4325d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly nits. Since it is a super large change, it would be good if someone else would also read it as well.
(I did not read the unit tests)
pkg/runner/test_runner.go
Outdated
// wait until error channel is emptied too. | ||
outCh = make(chan *target.Target) | ||
tr.safeCloseErrCh(ss) | ||
break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
break | |
break loop |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, on further reflection - no, we don't want to break out of the loop just yet. as the comment above says, we want to make sure error channel also gets processed, which is exactly why we replace outCh with a new one and wait for ss.errCh to be closed as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah wasn't sure what you want to break out of, I guess you meant the select then?
Would continue loop
be clearer in that case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great idea, indeed that would make the intent more clear. done.
Simplified, added support for serializing state and resuming. Not used yet but will be.
Just a warning: the quality of my review was poor this time. It mostly related to coding style (it is difficult to verify the code as it is for my brain). |
P.S.: Also mutex-logic looks dangerous in multiple ways. I guess it might cause a deadlock somewhere in future (when somebody will modify the code). |
well, this is overly general. previous implementation didn't use mutexes but was a maze of channels that was impossible to understand. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies for the late review. In fact, I am catching up only now, and I am also far from being done (for my reference, I shall resume from waitStepRunners
). Even if this is pushed, I still want to finish leaving a first pass of comments today.
CurPhase targetStepPhase `json:"cur_phase"` // Current phase of step execution. | ||
|
||
res error // Final result, if reached the end state. | ||
resCh chan error // Channel used to communicate result by the step runner. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the channel used to communicate between stepRunner
and targetHandler
. Could you please clarify in the comment? I haven't read through the whole code yet and it's not clear.
// Wait for step runners and readers to exit. | ||
if err := tr.waitStepRunners(ctx); err != nil { | ||
tr.log.Errorf("step runner error: %q, canceling", err) | ||
stepCancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does waitStepRunners
guarantee that all steps have returned? If so, stepCancel
should not be necessary. If not, steps should be awaited for if a cancellation signal is being propagated. We should keep track of any step that does not return within the timeout and flag it accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are right, this is not needed as by this point all the step runners have either exited or we gave up waiting on them to exit.
tr.log.Debugf(" %d %s %v", i, ts, stepErr) | ||
if ts.CurPhase == targetStepPhaseRun { | ||
inFlightTargets = append(inFlightTargets, ts) | ||
if stepErr != statectx.ErrPaused { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is even only one target that failed in any step, we are declaring the test not resumable (but continuing the loop after we declare resumeOk
false)? It should be fine to fail a target in a step, and still be able to resume the test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stepState.runErr aka stepErr is not about target errors, it is
// Runner error, returned from Run() or an error condition detected by the reader.
i.e. it's an error condition that aborts entire run. presence of such errors making run non-resumable makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, ok, not sure why I missed that here we were referring to an error of the whole step.
There was a lot of room for simplification in the previous implementation, in particular:
This just to say that in my opinion this solution has its own traps, as it makes data flow implicit (via mutex and global structure) rather than explicit, via channels. No questioning that the previous solution could be simplified, of course, but I personally find this solution not significantly easier to follow, with respect to the previous version with some unnecessary complexity removed. |
I have exactly the same impression. |
@marcoguerri sorry, i should have waited for your approval before merging. but i created https://github.com/facebookincubator/contest/pull/212 so we can continue the review there. |
tr.log.Debugf("waiting for step runners to finish") | ||
swch := make(chan struct{}) | ||
go func() { | ||
tr.mu.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an example of the indirect data path I was mentioning before.
Intuitively, I would imagine that tr.steps
is protected by tr.mu
. Indeed, this goroutine acquires mu
throughout its whole execution and I see we read ss.<X>
and then waits for tr.cond.Wait
. Is somebody else, from outside, changing stepRunning
, readerRunning
, etc. while we hold mu
?
From looking at the code, yes, but those writers also hold mu
. So, I cannot wrap my head around how this can work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this has to do with how waiting on sync.Condition
works. when you want to wait for some internal state protected by a mutex to change (in this case stepRunning
and readerRunning
), you acquire a mutex, test the state and if it is not to your satisfaction, you invoke tr.cond.Wait()
which releases the mutex and then suspends until signaled, at which point it re-locks and returns, letting you re-examine the state again. we signal tr.cond
when things change (runner exits, rader exits of target handler exits).
err = nrerr | ||
} | ||
for _, ss := range tr.steps { | ||
if ss.stepRunning { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previous implementation here would also issue a termination signal to the whole test in a further attempt to make the pipeline shut down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the step runner context will eventually be canceled (via defer stepCancel())
// custom timeouts | ||
func NewTestRunnerWithTimeouts(timeouts TestRunnerTimeouts) TestRunner { | ||
return TestRunner{timeouts: timeouts} | ||
func (tr *TestRunner) safeCloseOutCh(ss *stepState) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like safeCloseOutCh
and safeCloseErrCh
can be merged together and accept an argument from outside.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, except channels have different types and i couldn't find a way to cast them to something common.
// At this point we may still have an error to report, | ||
// wait until error channel is emptied too. | ||
outCh = nil | ||
tr.safeCloseErrCh(ss) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If outCh
indicates that the test step is guaranteed to have returned, then whoever signaled completion by closing outCh
should have closed also errCh
. Responsibility of closing these channels should not be spread across multiple entities.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will be simplified when we have one channel for results (as discussed). i'll leave it as is for now.
break loop | ||
case <-ss.errCh: | ||
break loop | ||
case <-time.After(tr.shutdownTimeout): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not be necessary. It should be stepRunner
guaranteeing to us that step termination is protected against shutdownTimeout
. We shouldn't be doing it in multiple places.
Tried to finish review of at least |
Simplified, added support for serializing state and resuming (not used yet but will be).